import backtrader as bt
import backtrader.feeds as btfeeds
import pandas as pd
import numpy as np
# ----------------------------------------------------
# 1) 多标的轮动策略(使用 bt.talib 的 SMA 打分)
# - 定期再平衡(rebalance_days)
# - 使用 order_target_percent 设置目标权重(非全买全卖)
# ----------------------------------------------------
class MultiAssetRotation(bt.Strategy):
params = dict(
fast_period=10,
slow_period=40,
top_n=2, # 选择前N个标的
rebalance_days=5, # 每N天再平衡一次
max_gross_leverage=1.0 # 总权重上限(100%资金)
)
def __init__(self):
# 通过 bt.talib 计算每个 data 的 SMA(需安装 TA-Lib)
self.sma_fast = {}
self.sma_slow = {}
for d in self.datas:
self.sma_fast[d] = bt.talib.SMA(d.close, timeperiod=self.p.fast_period)
self.sma_slow[d] = bt.talib.SMA(d.close, timeperiod=self.p.slow_period)
self.day_count = 0
def next(self):
self.day_count += 1
if self.day_count % self.p.rebalance_days != 0:
return
# 打分:fast/slow - 1
scores = []
for d in self.datas:
fast = self.sma_fast[d][0]
slow = self.sma_slow[d][0]
if slow == 0 or np.isnan(fast) or np.isnan(slow):
score = np.nan
else:
score = fast / slow - 1.0
scores.append((d, score))
scores = [(d, s) for d, s in scores if not np.isnan(s)]
if not scores:
return
scores.sort(key=lambda x: x[1], reverse=True)
winners = scores[: self.p.top_n]
# 将正分数归一化为权重(负分数为0),并限制总权重
raw = np.array([max(0.0, s) for _, s in winners], dtype=float)
if raw.sum() <= 0:
target_weights = {d: 0.0 for d in self.datas}
else:
scaled = raw / raw.sum() * self.p.max_gross_leverage
target_weights = {d: 0.0 for d in self.datas}
for (d, _), w in zip(winners, scaled):
target_weights[d] = float(w)
# 调整到目标仓位(部分调仓,不是一次性清空/满仓)
for d in self.datas:
self.order_target_percent(data=d, target=target_weights.get(d, 0.0))
def notify_order(self, order):
if order.status in [order.Completed, order.Partial]:
pass
def notify_trade(self, trade):
if trade.isclosed:
print(f"交易已平仓: {trade.data._name} PnL: {trade.pnl:.2f}, 净: {trade.pnlcomm:.2f}")
# ----------------------------------------------------
# 2) 生成多标的、分段趋势/震荡的模拟数据(非纯随机漫步)
# - 分段 drift(正/负/震荡)
# - 市场共同因子 + 个体噪声,形成相关性
# ----------------------------------------------------
def generate_multi_asset_data(n_assets=5, periods=500, seed=20251004):
rng = np.random.default_rng(seed)
dates = pd.date_range(start='2020-01-01', periods=periods, freq='B')
segments = [int(periods * 0.2), int(periods * 0.3), periods - int(periods * 0.5)]
assert sum(segments) == periods
market_drifts = [0.0008, -0.0005, 0.0002]
market_vol = 0.01
asset_dfs = []
for i in range(n_assets):
id_drift_offsets = rng.normal(0.0, 0.0004, size=3)
id_vol = 0.012 + 0.003 * rng.random()
rets = []
for seg_len, mdrift, off in zip(segments, market_drifts, id_drift_offsets):
eps = rng.normal(0, 1, size=seg_len)
r = np.zeros(seg_len)
phi = 0.2 # 轻微自相关,形成段内连贯
for t in range(seg_len):
prev = r[t-1] if t > 0 else 0.0
r[t] = mdrift + off + phi * prev + market_vol * 0.5 * eps[t] + id_vol * 0.5 * rng.normal()
rets.append(r)
rets = np.concatenate(rets)
price = 100.0 * np.exp(np.cumsum(rets))
close = price
open_ = close * (1 + rng.normal(0, 0.001, size=periods))
high = np.maximum(open_, close) * (1 + rng.random(size=periods) * 0.002)
low = np.minimum(open_, close) * (1 - rng.random(size=periods) * 0.002)
volume = rng.integers(5e4, 2e5, size=periods).astype(int)
df = pd.DataFrame({'open': open_, 'high': high, 'low': low, 'close': close, 'volume': volume}, index=dates)
df.index.name = 'datetime'
asset_dfs.append(df)
return asset_dfs
# ----------------------------------------------------
# 3) 回测入口
# ----------------------------------------------------
def run_backtest_multi():
asset_dfs = generate_multi_asset_data(n_assets=5, periods=500, seed=20251004)
cerebro = bt.Cerebro()
cerebro.addstrategy(MultiAssetRotation,
fast_period=10,
slow_period=40,
top_n=2,
rebalance_days=5,
max_gross_leverage=1.0)
for i, df in enumerate(asset_dfs):
data = btfeeds.PandasData(
dataname=df,
open='open', high='high', low='low', close='close', volume='volume', openinterest=-1
)
cerebro.adddata(data, name=f"Asset_{i+1}")
cerebro.broker.setcash(100000.0)
cerebro.broker.setcommission(commission=0.001)
cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
print(f"起始资金: {cerebro.broker.getvalue():.2f}")
results = cerebro.run()
final_value = cerebro.broker.getvalue()
returns = results[0].analyzers.returns.get_analysis()
print("-" * 32)
print(f"最终资金: {final_value:.2f}")
print(f"总收益(%) : {returns.get('rtot', 0.0) * 100:.2f}")
print(f"年化(%) : {returns.get('rnorm100', 0.0):.2f}")
print("-" * 32)
return cerebro, results
# 直接运行回测(适合 Quarto 单元)
cerebro, _ = run_backtest_multi()